Skip to content

Fix weixin oc disconnected#7039

Closed
GowayLee wants to merge 6 commits intoAstrBotDevs:masterfrom
GowayLee:fix-weixin_oc-disconnected
Closed

Fix weixin oc disconnected#7039
GowayLee wants to merge 6 commits intoAstrBotDevs:masterfrom
GowayLee:fix-weixin_oc-disconnected

Conversation

@GowayLee
Copy link
Copy Markdown
Contributor

@GowayLee GowayLee commented Mar 27, 2026

修复 #7022weixin_oc 在网络或代理短暂切换后断连并停止运行的问题。此前 WeixinOCAdapter.run() 只会在入站长轮询出现 asyncio.TimeoutError 时继续轮询(#6915 已覆盖该场景),但像 aiohttp.ServerDisconnectedError 这类可恢复的连接异常仍会直接跳出循环,导致适配器永久停止。

Modifications / 改动点

  • astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py 中扩展了入站轮询的异常恢复逻辑:当 WeixinOCAdapter.run() 捕获到 aiohttp.ClientConnectionError 时,会记录最近一次入站错误、输出 warning 日志、关闭当前客户端会话,并在短暂等待后继续轮询。

  • 保留了非网络异常的原有致命错误路径,避免将业务逻辑错误或其他非预期异常误判为可恢复问题而被静默吞掉。

  • tests/unit/test_weixin_oc_typing.py 中补充了回归测试,覆盖以下场景:

    • 遇到 aiohttp.ServerDisconnectedError 后能够恢复轮询
    • 非网络类轮询异常仍然保持致命行为
  • 沿用已合并的 fix: keep weixin_oc polling after inbound timeouts #6915 超时恢复模式。

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

$ uv run pytest tests/unit/test_weixin_oc_typing.py
============================= test session starts ==============================
platform linux -- Python 3.12.11, pytest-9.0.2, pluggy-1.6.0
collected 25 items

tests/unit/test_weixin_oc_typing.py .........................            [100%]

======================== 25 passed, 3 warnings in 6.03s ========================

Checklist / 检查清单

  • 😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
    / 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。

  • 👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
    / 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”

  • 🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
    / 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txtpyproject.toml 文件相应位置。

  • 😮 My changes do not introduce malicious code.
    / 我的更改没有引入恶意代码。

Summary by Sourcery

Add robust typing indicator support and connection recovery for the weixin_oc adapter, and ensure agent pipeline correctly manages typing lifecycle across normal and error flows.

New Features:

  • Introduce typing indicator support for weixin_oc conversations, including ticket management, keepalive, and cancellation per user and owner.
  • Expose platform-agnostic send_typing/stop_typing hooks on message events and integrate them into the internal agent sub-stage to signal LLM processing status to users.

Bug Fixes:

  • Make the weixin_oc adapter automatically recover from transient inbound polling connection errors such as aiohttp.ClientConnectionError instead of stopping permanently.

Enhancements:

  • Ensure typing-related background tasks are cleaned up before client shutdown and during adapter termination to avoid leaks and dangling state.
  • Adjust the internal agent sub-stage to always attempt to stop typing on completion or failure for non-streaming platforms while tolerating typing API failures.

Tests:

  • Add comprehensive unit tests for weixin_oc typing behavior, including ticket refresh, keepalive, cancellation semantics, cleanup ordering, and recovery from server disconnects.
  • Add unit tests for the internal agent sub-stage to verify correct typing lifecycle handling and resilience to send_typing/stop_typing errors.
  • Extend AstrMessageEvent tests to cover the default no-op stop_typing implementation.

@auto-assign auto-assign Bot requested review from advent259141 and anka-afk March 27, 2026 03:12
@dosubot dosubot Bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Mar 27, 2026
@GowayLee GowayLee closed this Mar 27, 2026
@dosubot dosubot Bot added the area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. label Mar 27, 2026
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • The reconnect logic in run() currently sleeps for a fixed 2 seconds after any aiohttp.ClientConnectionError; consider using a simple backoff strategy (or making the delay configurable) so that repeated failures don't result in a tight reconnect loop or unnecessarily long pauses.
  • Typing session state in _typing_states is never pruned for users that no longer have owners/tasks; you may want to drop empty TypingSessionState entries once both owners and associated tasks are cleared to avoid unbounded growth over long-running processes.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The reconnect logic in `run()` currently sleeps for a fixed 2 seconds after any `aiohttp.ClientConnectionError`; consider using a simple backoff strategy (or making the delay configurable) so that repeated failures don't result in a tight reconnect loop or unnecessarily long pauses.
- Typing session state in `_typing_states` is never pruned for users that no longer have owners/tasks; you may want to drop empty `TypingSessionState` entries once both `owners` and associated tasks are cleared to avoid unbounded growth over long-running processes.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py" line_range="53" />
<code_context>
     error: str | None = None


+@dataclass
+class TypingSessionState:
+    ticket: str | None = None
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the new typing-related logic into a dedicated manager and clarifying TypingSessionState transitions to keep the adapter simpler and the state machine easier to follow.

You can reduce the new complexity substantially by explicitly encapsulating the typing orchestration, while keeping behavior identical.

### 1. Extract a dedicated typing manager

Right now `WeixinOCAdapter` owns:

- `_typing_states`
- `TypingSessionState`
- all the orchestration methods (`_get_typing_state`, `_ensure_typing_ticket`, `_run_typing_keepalive`, `start_typing`, `stop_typing`, `_cleanup_typing_tasks`, …)

You can move these into a small helper class that the adapter delegates to. This keeps the adapter focused on login/polling/message handling and isolates the state machine.

Example sketch:

```python
@dataclass
class TypingSessionState:
    ticket: str | None = None
    ticket_context_token: str | None = None
    refresh_after: float = 0.0
    keepalive_task: asyncio.Task | None = None
    cancel_task: asyncio.Task | None = None
    owners: set[str] = field(default_factory=set)
    lock: asyncio.Lock = field(default_factory=asyncio.Lock)


class WeixinTypingManager:
    def __init__(
        self,
        client: WeixinOCClient,
        *,
        adapter_id: str,
        shutdown_event: asyncio.Event,
        keepalive_interval_s: int,
        ticket_ttl_s: int,
        context_tokens: dict[str, str],
    ) -> None:
        self._client = client
        self._adapter_id = adapter_id
        self._shutdown_event = shutdown_event
        self._keepalive_interval_s = keepalive_interval_s
        self._ticket_ttl_s = ticket_ttl_s
        self._context_tokens = context_tokens
        self._states: dict[str, TypingSessionState] = {}

    # move _get_typing_state, _typing_supported_for, _ensure_typing_ticket,
    # _send_typing_state, _run_typing_keepalive, _typing_keepalive_loop,
    # _delayed_cancel_typing, start_typing, stop_typing, _cleanup_typing_tasks
    # into methods here, switching self.client → self._client,
    # self.meta().id → self._adapter_id, self._typing_states → self._states, etc.
```

Adapter usage then becomes a small surface:

```python
class WeixinOCAdapter(Platform):
    def __init__(...):
        ...
        self._typing_manager = WeixinTypingManager(
            client=self.client,
            adapter_id=self.meta().id,
            shutdown_event=self._shutdown_event,
            keepalive_interval_s=self._typing_keepalive_interval_s,
            ticket_ttl_s=self._typing_ticket_ttl_s,
            context_tokens=self._context_tokens,
        )

    async def start_typing(self, user_id: str, owner_id: str) -> None:
        await self._typing_manager.start_typing(user_id, owner_id)

    async def stop_typing(self, user_id: str, owner_id: str) -> None:
        await self._typing_manager.stop_typing(user_id, owner_id)

    async def run(self) -> None:
        ...
        finally:
            await self._typing_manager.cleanup()
            await self.client.close()

    async def terminate(self) -> None:
        self._shutdown_event.set()
        await self._typing_manager.cleanup()
```

This is mostly mechanical (rename `self` references, pass dependencies via `__init__`) and does not change behavior, but it cuts the adapter’s responsibility surface significantly.

### 2. Localize state transitions on `TypingSessionState`

A lot of complexity comes from repeated patterns manipulating `owners`, `keepalive_task`, and `cancel_task` under locks. You can move these into small methods on `TypingSessionState` (or on the manager operating on a single state) to make the state machine more explicit and reduce repeated branching.

For example:

```python
@dataclass
class TypingSessionState:
    ...
    # Invariants:
    # - if owners is empty, keepalive_task is None
    # - cancel_task is only non-None while a cancel is scheduled but not sent/completed

    def has_active_owners(self) -> bool:
        return bool(self.owners)

    def add_owner(self, owner_id: str) -> bool:
        """Returns True if this transitioned from 0 → 1 owners."""
        before = len(self.owners)
        self.owners.add(owner_id)
        return before == 0 and len(self.owners) == 1

    def remove_owner(self, owner_id: str) -> bool:
        """Returns True if this transitioned from 1 → 0 owners."""
        if owner_id not in self.owners:
            return False
        self.owners.remove(owner_id)
        return not self.owners
```

Then `start_typing` / `stop_typing` logic becomes easier to follow:

```python
async def start_typing(self, user_id: str, owner_id: str) -> None:
    state = self._get_typing_state(user_id)
    cancel_task: asyncio.Task | None = None

    async with state.lock:
        if not self._typing_supported_for(user_id):
            return
        first_owner = state.add_owner(owner_id)
        if not first_owner:
            # already active; nothing else to do
            return
        # remaining logic unchanged, but now you know this is the 0→1 transition
```

```python
async def stop_typing(self, user_id: str, owner_id: str) -> None:
    state = self._states.get(user_id)
    if state is None:
        return

    task: asyncio.Task | None = None
    async with state.lock:
        last_owner = state.remove_owner(owner_id)
        if not last_owner:
            return
        task = state.keepalive_task
        state.keepalive_task = None
    ...
```

This doesn’t change semantics but encodes the important transitions (`0→1`, `1→0`) explicitly, so you don’t need to mentally reconstruct them from raw set length checks in multiple places.

### 3. Use `_cancel_task_safely` consistently

You already created `_cancel_task_safely`, but some places still cancel tasks directly and await later. Centralizing the pattern makes the async lifecycle easier to reason about and eliminates subtle differences.

Example in `_cleanup_typing_tasks`:

```python
async def _cleanup_typing_tasks(self) -> None:
    tasks: list[asyncio.Task] = []
    cancels: list[tuple[str, str]] = []

    for user_id, state in list(self._states.items()):
        ...
        if state.keepalive_task is not None:
            tasks.append(state.keepalive_task)
            state.keepalive_task = None
        if state.cancel_task is not None:
            tasks.append(state.cancel_task)
            state.cancel_task = None

    for task in tasks:
        await self._cancel_task_safely(
            task,
            log_message="weixin_oc(%s): typing cleanup failed",
            log_args=(self._adapter_id,),
        )
```

If you move the logic into the manager, it’s easy to ensure that any place that cancels tasks uses this helper, instead of mixing direct `task.cancel()` calls with separate awaiting logic.

---

These steps keep all current behavior (including error handling and retry semantics) but:

- decouple typing from the adapter,
- make the per-user state machine explicit,
- and concentrate task/lock orchestration in a single, smaller component.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

error: str | None = None


@dataclass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the new typing-related logic into a dedicated manager and clarifying TypingSessionState transitions to keep the adapter simpler and the state machine easier to follow.

You can reduce the new complexity substantially by explicitly encapsulating the typing orchestration, while keeping behavior identical.

1. Extract a dedicated typing manager

Right now WeixinOCAdapter owns:

  • _typing_states
  • TypingSessionState
  • all the orchestration methods (_get_typing_state, _ensure_typing_ticket, _run_typing_keepalive, start_typing, stop_typing, _cleanup_typing_tasks, …)

You can move these into a small helper class that the adapter delegates to. This keeps the adapter focused on login/polling/message handling and isolates the state machine.

Example sketch:

@dataclass
class TypingSessionState:
    ticket: str | None = None
    ticket_context_token: str | None = None
    refresh_after: float = 0.0
    keepalive_task: asyncio.Task | None = None
    cancel_task: asyncio.Task | None = None
    owners: set[str] = field(default_factory=set)
    lock: asyncio.Lock = field(default_factory=asyncio.Lock)


class WeixinTypingManager:
    def __init__(
        self,
        client: WeixinOCClient,
        *,
        adapter_id: str,
        shutdown_event: asyncio.Event,
        keepalive_interval_s: int,
        ticket_ttl_s: int,
        context_tokens: dict[str, str],
    ) -> None:
        self._client = client
        self._adapter_id = adapter_id
        self._shutdown_event = shutdown_event
        self._keepalive_interval_s = keepalive_interval_s
        self._ticket_ttl_s = ticket_ttl_s
        self._context_tokens = context_tokens
        self._states: dict[str, TypingSessionState] = {}

    # move _get_typing_state, _typing_supported_for, _ensure_typing_ticket,
    # _send_typing_state, _run_typing_keepalive, _typing_keepalive_loop,
    # _delayed_cancel_typing, start_typing, stop_typing, _cleanup_typing_tasks
    # into methods here, switching self.client → self._client,
    # self.meta().id → self._adapter_id, self._typing_states → self._states, etc.

Adapter usage then becomes a small surface:

class WeixinOCAdapter(Platform):
    def __init__(...):
        ...
        self._typing_manager = WeixinTypingManager(
            client=self.client,
            adapter_id=self.meta().id,
            shutdown_event=self._shutdown_event,
            keepalive_interval_s=self._typing_keepalive_interval_s,
            ticket_ttl_s=self._typing_ticket_ttl_s,
            context_tokens=self._context_tokens,
        )

    async def start_typing(self, user_id: str, owner_id: str) -> None:
        await self._typing_manager.start_typing(user_id, owner_id)

    async def stop_typing(self, user_id: str, owner_id: str) -> None:
        await self._typing_manager.stop_typing(user_id, owner_id)

    async def run(self) -> None:
        ...
        finally:
            await self._typing_manager.cleanup()
            await self.client.close()

    async def terminate(self) -> None:
        self._shutdown_event.set()
        await self._typing_manager.cleanup()

This is mostly mechanical (rename self references, pass dependencies via __init__) and does not change behavior, but it cuts the adapter’s responsibility surface significantly.

2. Localize state transitions on TypingSessionState

A lot of complexity comes from repeated patterns manipulating owners, keepalive_task, and cancel_task under locks. You can move these into small methods on TypingSessionState (or on the manager operating on a single state) to make the state machine more explicit and reduce repeated branching.

For example:

@dataclass
class TypingSessionState:
    ...
    # Invariants:
    # - if owners is empty, keepalive_task is None
    # - cancel_task is only non-None while a cancel is scheduled but not sent/completed

    def has_active_owners(self) -> bool:
        return bool(self.owners)

    def add_owner(self, owner_id: str) -> bool:
        """Returns True if this transitioned from 0 → 1 owners."""
        before = len(self.owners)
        self.owners.add(owner_id)
        return before == 0 and len(self.owners) == 1

    def remove_owner(self, owner_id: str) -> bool:
        """Returns True if this transitioned from 1 → 0 owners."""
        if owner_id not in self.owners:
            return False
        self.owners.remove(owner_id)
        return not self.owners

Then start_typing / stop_typing logic becomes easier to follow:

async def start_typing(self, user_id: str, owner_id: str) -> None:
    state = self._get_typing_state(user_id)
    cancel_task: asyncio.Task | None = None

    async with state.lock:
        if not self._typing_supported_for(user_id):
            return
        first_owner = state.add_owner(owner_id)
        if not first_owner:
            # already active; nothing else to do
            return
        # remaining logic unchanged, but now you know this is the 0→1 transition
async def stop_typing(self, user_id: str, owner_id: str) -> None:
    state = self._states.get(user_id)
    if state is None:
        return

    task: asyncio.Task | None = None
    async with state.lock:
        last_owner = state.remove_owner(owner_id)
        if not last_owner:
            return
        task = state.keepalive_task
        state.keepalive_task = None
    ...

This doesn’t change semantics but encodes the important transitions (0→1, 1→0) explicitly, so you don’t need to mentally reconstruct them from raw set length checks in multiple places.

3. Use _cancel_task_safely consistently

You already created _cancel_task_safely, but some places still cancel tasks directly and await later. Centralizing the pattern makes the async lifecycle easier to reason about and eliminates subtle differences.

Example in _cleanup_typing_tasks:

async def _cleanup_typing_tasks(self) -> None:
    tasks: list[asyncio.Task] = []
    cancels: list[tuple[str, str]] = []

    for user_id, state in list(self._states.items()):
        ...
        if state.keepalive_task is not None:
            tasks.append(state.keepalive_task)
            state.keepalive_task = None
        if state.cancel_task is not None:
            tasks.append(state.cancel_task)
            state.cancel_task = None

    for task in tasks:
        await self._cancel_task_safely(
            task,
            log_message="weixin_oc(%s): typing cleanup failed",
            log_args=(self._adapter_id,),
        )

If you move the logic into the manager, it’s easy to ensure that any place that cancels tasks uses this helper, instead of mixing direct task.cancel() calls with separate awaiting logic.


These steps keep all current behavior (including error handling and retry semantics) but:

  • decouple typing from the adapter,
  • make the per-user state machine explicit,
  • and concentrate task/lock orchestration in a single, smaller component.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a typing indicator mechanism across the platform, with a specific implementation for the weixin_oc adapter. Key changes include the addition of a stop_typing method to the base message event, state management for typing sessions (including keep-alive and delayed cancellation) in the WeChat adapter, and integration into the internal processing pipeline to ensure typing status is correctly managed during LLM requests. Additionally, the PR includes error handling for connection issues in the WeChat adapter and comprehensive unit tests for the new typing logic. I have no feedback to provide.

@GowayLee GowayLee deleted the fix-weixin_oc-disconnected branch March 27, 2026 03:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant